Skip to content

Conversation

mkmeral
Copy link
Contributor

@mkmeral mkmeral commented Oct 2, 2025

Description

This PR adds streaming support to the Swarm and Graph multi-agent systems, enabling real-time event emission during multi-agent execution. This brings multi-agent systems to feature parity with the single Agent class streaming capabilities.

Key Changes

New Event Types (src/strands/types/_events.py):

  • MultiAgentNodeStartEvent: Emitted when a node begins execution
  • MultiAgentNodeCompleteEvent: Emitted when a node completes execution
  • MultiAgentNodeStreamEvent: Forwards agent events with node context
  • MultiAgentHandoffEvent: Emitted during agent handoffs in Swarm (includes from_node, to_node, and message)

Swarm Streaming (src/strands/multiagent/swarm.py):

  • Added stream_async() method that yields events during execution
  • Refactored invoke_async() to use stream_async() internally (maintains backward compatibility)
  • Events include node start/complete, forwarded agent events, handoff notifications, and final result
  • Proper event emission even during failures

Graph Streaming (src/strands/multiagent/graph.py):

  • Added stream_async() method for real-time event streaming
  • Refactored invoke_async() to consume stream_async() events
  • Supports streaming from parallel node execution
  • Events maintain node context throughout execution

Testing:

  • Comprehensive test coverage for streaming functionality in both Swarm and Graph
  • Tests for parallel execution, handoffs, failures, and timeouts
  • Backward compatibility tests to ensure existing code continues to work

Benefits

  • Real-time visibility into multi-agent execution progress
  • Consistent streaming API across single and multi-agent systems
  • Better debugging and monitoring capabilities
  • Foundation for UI progress indicators and live updates

Related Issues

Documentation PR

Type of Change

New feature

Testing

How have you tested the change?

  • Added comprehensive unit tests for streaming in both Swarm and Graph (tests/strands/multiagent/test_swarm.py, tests/strands/multiagent/test_graph.py)
  • Tests cover: basic streaming, parallel execution, handoffs, failures, timeouts, and backward compatibility
  • All existing tests pass, confirming backward compatibility

Verify that the changes do not break functionality or introduce warnings in consuming repositories: agents-docs, agents-tools, agents-cli

  • I ran hatch run prepare

Checklist

  • I have read the CONTRIBUTING document
  • I have added any necessary tests that prove my fix is effective or my feature works
  • I have updated the documentation accordingly
  • I have added an appropriate example to the documentation to outline the feature, or no new docs are needed
  • My changes generate no new warnings
  • Any dependent changes have been merged and published

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@mkmeral mkmeral marked this pull request as draft October 2, 2025 13:07
@mkmeral mkmeral marked this pull request as ready for review October 3, 2025 10:32
Yields:
Dictionary events containing graph execution information including:
- MultiAgentNodeStartEvent: When a node begins execution
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These types aren't exposed to customers, so we should either remove these docs or document the shape of the dictionaries being emited

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you also list out the the new events to the PR description (similar to #788) along with the signatures of the new apis being added. This well help the PR be more akin to the spec of what's being proposed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so we should either remove these docs AND document the shape of the dictionaries being emitted*

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have matched the implementation/documentation to agent. I will add additional docs as a followup CR on docs repo. I will also update the RP description to explain the events emitted.

try:
event = await asyncio.wait_for(async_generator.__anext__(), timeout=timeout)
yield event
except StopAsyncIteration:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this always thrown at the end and thus part of normal execution?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems like theres might be a more pythonic way to do this without the while

            async with asyncio.timeout(timeout):
                async for event in async_generator:

# because a failure would throw exception and code would not make it here
ready_nodes.extend(self._find_newly_ready_nodes(current_batch))

async def _execute_nodes_parallel(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this follow the pattern of how tools are executed in parallel - I know we worked out some wrinkles/bugs with that, so it'd be ideal to not have to redo those

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, does this have to be in this PR? Can we do the streaming then add parallel execution as a different commit to simplify the review?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parallel calls should be in this PR as graph already supports them.

w.r.t. bugs, I have updated the logic similar to #954 . is there anything else?

start_event = MultiAgentNodeStartEvent(
node_id=node.node_id, node_type="agent" if isinstance(node.executor, Agent) else "multiagent"
)
yield start_event.as_dict()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we do this at a higher level instead of in here? That way we can ensure this method is always returning TypedEvents

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same for below

wrapped_event = MultiAgentNodeStreamEvent(node.node_id, event)
yield wrapped_event.as_dict()
# Capture the final result event
if "result" in event:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we just do an isinstance check here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not easily, because agent also translates it to a dict before returning the responses. https://github.com/strands-agents/sdk-python/blob/main/src/strands/agent/agent.py#L591

Is there a reason we decided to go this way instead of returning typed events?


except Exception:
logger.exception("node=<%s> | node execution failed", current_node.node_id)
except Exception as e:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why can't we use an exception type for this? This seems hacky

status=Status.EXECUTING,
task=task,
total_nodes=len(self.nodes),
edges=[(edge.from_node, edge.to_node) for edge in self.edges],
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note related to this PR, but why doesn't GraphState take Iterable[GraphEdge] instead of edges: list[Tuple["GraphNode", "GraphNode"]] = field(default_factory=list)

Did GraphEdge come later?

try:
event = await asyncio.wait_for(async_generator.__anext__(), timeout=timeout)
yield event
except StopAsyncIteration:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: seems like theres might be a more pythonic way to do this without the while

            async with asyncio.timeout(timeout):
                async for event in async_generator:

# because a failure would throw exception and code would not make it here
ready_nodes.extend(self._find_newly_ready_nodes(current_batch))

async def _execute_nodes_parallel(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, does this have to be in this PR? Can we do the streaming then add parallel execution as a different commit to simplify the review?

self, async_generator: AsyncIterator[dict[str, Any]], timeout: float, timeout_message: str
) -> AsyncIterator[dict[str, Any]]:
"""Wrap an async generator with timeout functionality."""
while True:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same nit as in graph

logger.exception("node=<%s> | node execution failed", current_node.node_id)
except Exception as e:
# Check if this is a timeout exception
if "timed out after" in str(e):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: can we create a variable for this so we don't accidentally change the message in exception

Murat Kaan Meral added 2 commits October 10, 2025 13:07
- Update docstrings to match Agent's minimal style (use dict keys instead of class names)
- Add isinstance checks for result event detection for type safety
- Improve _stream_with_timeout to handle None timeout case
- Add MultiAgentResultEvent for consistency with Agent pattern
- Yield TypedEvent objects internally, convert to dict at API boundary
- All 154 tests passing
- Remove unnecessary asyncio.gather() after event loop completion
- Same issue as tool executor PR strands-agents#954
- By the time loop exits, all tasks have already completed
- Gather was waiting for already-finished tasks (no-op)
- All 154 tests passing
Copy link

codecov bot commented Oct 10, 2025

Codecov Report

❌ Patch coverage is 90.32258% with 15 lines in your changes missing coverage. Please review.

Files with missing lines Patch % Lines
src/strands/multiagent/graph.py 91.95% 2 Missing and 5 partials ⚠️
src/strands/multiagent/swarm.py 87.75% 4 Missing and 2 partials ⚠️
src/strands/multiagent/base.py 50.00% 2 Missing ⚠️

📢 Thoughts on this report? Let us know!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants